set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=50;
set hive.exec.max.dynamic.partitions.pernode=50;

INSERT overwrite TABLE jms_dwd.dwd_tab_end_piece_unprocessed_dt partition(dt)
SELECT id,
       waybill_no,
       end_code,
       end_code_desc,
       SOURCE,
       input_time,
       network_id,
       network_code,
       network_name,
       agency_network_id,
       agency_network_code,
       agency_network_name,
       league_network_id,
       league_network_code,
       league_network_name,
       is_delete,
       end_time,
       update_time,
       sender_network_id,
       sender_network_code,
       sender_network_name,
       scan_by,
       scan_by_code,
       scan_by_name,
       dt
FROM
  (SELECT *,
          row_number() over w rnk
   FROM jms_ods.tab_end_piece
   WHERE dt = '{{ execution_date | cst_ds }}'
     AND date_format(end_time,'yyyy-MM-dd') = '{{ execution_date | cst_ds }}' window w AS (partition BY waybill_no
                                                                        ORDER BY input_time DESC) )tt
WHERE rnk = 1
distribute by 1
;